Fork me on GitHub

并发容器与框架

注意:所有文章除特别说明外,转载请注明出处.

第六章 Java并发容器和框架

[TOC]

6.1 ConcurrentHashMap的实现原理和使用

ConcurrentHashMap是线程安全且高效的HashMap。

6.1.1 为何使用ConcurrentHashMap

在并发编程中使用HashMap可能会导致死循环,而使用线程安全的HashTable效率非常低下,基于上面两个原因,所以有了ConcurrentHashMap。

1.线程不安全的HashMap

多线程环境下,使用HashMap进行put操作会导致死循环,导致CPU利用率接近1。

2.效率低下的HashTable

HashTable容器使用synchronize来保证线程安全,但是在竞争激励的情况下HashTable效率低下。当一个线程访问HashTable的同步方法,其它线程访问将会进入阻塞或者是轮询状态。

3.ConcurrentHashMap的锁分段技术可有效提升并发访问率

提示:HashTable在并发环境下表现效率低下的原因是因为所有访问HashTable的线程都必须要竞争同一把锁,如果容器中存在多把锁,每一把锁用于锁定容器中的一部分数据,那么多线程访问容器里不同数据段的数据时,线程间就不存在锁竞争(锁分段技术)。

锁分段技术:将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据时,其它段的数据也能被其它线程访问。

6.1.2 ConcurrentHashMap的结构

ConcurrentHashMap是由 Segment数组结构 和 HashEntry数组结构 组成。Segment数组结构 是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色。HashEntry 则用于存储键值数据。

一个ConcurrentHashMap里包含一个 Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构。一个Segment包含一个HashEntry数组,每个HashEntry是一个链表结构的元素。

6.1.3 ConcurrentHashMap初始化

ConcurrentHashMap初始化方法是通过 initialCapacity loadFactor concurrencyLevel等几个参数来初始化segment数组、段偏移量segmentshift、段掩码segmentMask和每个segment里的HashEntry数组实现。

6.1.4 定位Segment

因为ConcurrentHashMap是通过分段锁Segment来保护不同段的数据,那么在插入和获取元素的时候,必须先通过散列算法定位到Segment。

6.1.5 ConcurrentHashMap的操作

1.get操作

Segment的get操作非常高效和简单。先通过一次再散列,然后使用这个散列值通过散列运算定位到Segment,再通过散列算法定位到元素。

2.put操作

由于put方法需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必须加锁。

3.size操作

如果需要统计整个ConcurrentHashMap中元素的大小,需要统计所有Segment里元素的大小后求和。

6.2 ConcurrentLinkedQueue

在并发编程环境下,需要使用线程安全的队列。如果要实现一个线程安全的队列有两种方式:1.使用阻塞算法。2.使用非阻塞算法。

1.使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或者两把锁。

2.非阻塞实现方式则可以使用循环CAS的方式来实现。

ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全队列,采用先进先出的规则对节点进行排序,当添加一个元素的时候,它会添加到队列的尾部。当获取一个元素时,会返回队列的头部。采用CAS算法实现。

6.2.1 ConcurrentLinkedQueue的结构

ConcurrentLinkedQueue 由head节点和tail节点组成,每个节点Node由节点元素(item)和指向下一个节点(next)的引用组成,节点与节点之间就是通过next关联起来,从而组成一张链表结构的队列。

6.2.2 入队列

入队列就是将入队节点添加到队列的尾部。

6.2.3 出队列

出队列就是从队列中返回一个结点元素,并清空该节点对元素的引用。

6.3 Java阻塞队列

6.3.1 什么是阻塞队列

阻塞队列,指一个支持两个附加操作的队列。这两个附件操作支持阻塞的插入和移除操作。

1.支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,直到队列不满。

2.支持阻塞的移除方法:对队列为空时,获取元素的线程会等待队列变为非空。

提示:阻塞队列常用于生产者和消费者的场景。

注意:如果是无界阻塞队列,队列不可能会出现满的情况,所以使用put或offer方法永远不会被阻塞,而且使用offer方法时,该方法永远返回true。

6.3.2 Java里的阻塞队列

JDK 7 提供7个阻塞队列:

1.ArrayBlockingQueue 数组组成的有界阻塞队列,按照先进先出的原则对元素进行排序。默认情况不保证线程公平的访问队列。

    //在初始化时指定阻塞队列的容量和公平性
    ArrayBlockingQueue(int capacity, boolean fair);

提示:公平访问队列是指,阻塞的线程可以按照阻塞的先后顺序访问队列,先阻塞的线程先访问队列。非公平性是指,阻塞的线程可以争夺访问队列的资格。

2.LinkedBlockingQueue 链表实现的有界阻塞队列

3.PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。默认情况采取自然升序排列。

4.DelayQueue 支持延时获取元素的无界阻塞队列

5.SynchronousQueue 不存储元素的阻塞队列,每一个put操作都必须等待一个take操作,否则不能继续添加元素。

    //可以指定公平性策略
    public SynchronousQueue(boolean fair);

6.LinkedTransferQueue

7.LinkedBlockingDeque

6.3.3 阻塞队列的实现原理

通知模式

当生产者往满的队列里添加元素时会阻塞生产者,当消费者消费队列中的一元素之后会通知生产者当前队列可用。

6.4 Fork | Join框架

6.4.1 Fork/Join框架简介

并行执行任务框架,将一个大任务分割成若干个小任务,最终汇总每个任务结果后得到大任务结果的框架。

Fork是将大任务切分成若干小任务并行的执行,Join是合并这些子任务的执行结果,最后得到大任务的结果。

6.4.2 工作窃取算法

该算法表示某个线程从其它队列里窃取任务来执行。此时会发生窃取任务线程和被窃取任务线程之间的竞争,为了解决这个问题,使用双端队列,被窃取的任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

1.优点:充分利用线程进行并行运算,减少线程间的竞争。

2.缺点:某些情况下还是存在竞争。如:双端队列中只有一个任务时。

6.4.3 Fork/Join框架的设计

1.分割任务

2.执行任务并合并结果

那么这里Fork/Join使用两个类来完成上面两件事情:

1.ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。

提示:ForkJoinTask与一般任务的主要区别在于它需要实现compute()方法。在此方法中首先需要判断任务足够小,才执行当前子任务并返回结果。

    在这里不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供以下两个子类:

        1.RecursiveAction:用于没有返回结果的任务。

        2.RecursiveTask:用于有返回结果的任务。

2.ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。

6.4.4 使用Fork/Join框架

package Java.ChapterSeven.ForkJoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class ForkJoinExample extends RecursiveTask<Integer> {

    private final int threshold = 5;
    private int firstOne;
    private int lastOne;

    public ForkJoinExample(int firstOne, int lastOne) {
        this.firstOne = firstOne;
        this.lastOne = lastOne;
    }

    @Override
    protected Integer compute() {
        int result = 0;
        //任务小可以直接计算
        if (lastOne - firstOne <= threshold){
            for (int i = firstOne; i <= lastOne; i++){
                result += i;
            }
        }else {
            //拆分成小任务
            int middle = firstOne + (lastOne - firstOne) / 2;
            ForkJoinExample leftTask = new ForkJoinExample(firstOne, lastOne);
            ForkJoinExample rightTask = new ForkJoinExample(middle + 1, lastOne);
            leftTask.fork();
            rightTask.fork();

            result = leftTask.join() + rightTask.join();
        }

        return result;
    }

    public static void main(String[] args) {
        ForkJoinExample example = new ForkJoinExample(1, 1000);
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        Future<Integer> result = forkJoinPool.submit(example);
        try {
            System.out.println(result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

6.4.5 Fork/Join框架的异常处理和实现原理

ForkJoinTask在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以ForkJoinTask提供isCompletedAbnormally()方法检查任务是否已经抛出异常或者被取消了,并且可以通过ForkJoinTask的getException()方法获取异常。

ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。

6.4.6 Fork/Join框架的实现原理

ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责存放程序提交给ForkJoinPool的任务。ForkJoinWorkerThread负责执行这些任务。

本文标题:并发容器与框架

文章作者:Bangjin-Hu

发布时间:2019年10月15日 - 09:22:26

最后更新:2020年03月30日 - 08:18:32

原始链接:http://bangjinhu.github.io/undefined/第6章 Java并发容器和框架/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

Bangjin-Hu wechat
欢迎扫码关注微信公众号,订阅我的微信公众号.
坚持原创技术分享,您的支持是我创作的动力.